{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Random Forests Multi-node, Multi-GPU demo\n",
    "\n",
    "The experimental cuML multi-node, multi-GPU (MNMG) implementation of random forests leverages Dask to do embarrassingly-parallel model fitting. For a random forest with `N` trees being fit by `W` workers, each worker will build `N / W` trees. During inference, predictions from all `N` trees will be combined.\n",
    "\n",
    "The caller is responsible for partitioning the data efficiently via Dask. To build an accurate model, it's important to ensure that each worker has a representative chunk of the data. This can come by distributing the data evenly after ensuring that it is well shuffled. Or, given sufficient memory capacity, the caller can replicate the data to all workers. This approach will most closely simulate the single-GPU building approach.\n",
    "\n",
    "**Note:** cuML 0.9 contains the first, experimental preview release of the MNMG random forest model. The API is subject to change in future releases, and some known limitations remain (listed in the documentation).\n",
    "\n",
    "For more information on MNMG Random Forest models, see the documentation:\n",
    " * https://docs.rapids.ai/api/cuml/stable/api.html#cuml.dask.ensemble.RandomForestClassifier\n",
    " * https://docs.rapids.ai/api/cuml/stable/api.html#cuml.dask.ensemble.RandomForestRegressor"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "import sklearn\n",
    "\n",
    "import pandas as pd\n",
    "import cudf\n",
    "import cuml\n",
    "\n",
    "from sklearn import model_selection\n",
    "\n",
    "from cuml import datasets\n",
    "from cuml.metrics import accuracy_score\n",
    "from cuml.dask.common import utils as dask_utils\n",
    "from dask.distributed import Client, wait\n",
    "from dask_cuda import LocalCUDACluster\n",
    "import dask_cudf\n",
    "\n",
    "from cuml.dask.ensemble import RandomForestClassifier as cumlDaskRF\n",
    "from sklearn.ensemble import RandomForestClassifier as sklRF"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Start Dask cluster"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# This will use all GPUs on the local host by default\n",
    "cluster = LocalCUDACluster(threads_per_worker=1)\n",
    "c = Client(cluster)\n",
    "\n",
    "# Query the client for all connected workers\n",
    "workers = c.has_what().keys()\n",
    "n_workers = len(workers)\n",
    "n_streams = 8 # Performance optimization"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Define Parameters\n",
    "\n",
    "In addition to the number of examples, random forest fitting performance depends heavily on the number of columns in a dataset and (especially) on the maximum depth to which trees are allowed to grow. Lower `max_depth` values can greatly speed up fitting, though going too low may reduce accuracy."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Data parameters\n",
    "train_size = 100000\n",
    "test_size = 1000\n",
    "n_samples = train_size + test_size\n",
    "n_features = 20\n",
    "\n",
    "# Random Forest building parameters\n",
    "max_depth = 12\n",
    "n_bins = 16\n",
    "n_trees = 1000"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Generate Data on host\n",
    "\n",
    "In this case, we generate data on the client (initial process) and pass it to the workers. You could also load data directly onto the workers via, for example, `dask_cudf.read_csv()`. See also the k-means MNMG notebook (kmeans_mnmg_demo.ipynb) for an alternative method of generating data on the worker nodes."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "X, y = datasets.make_classification(n_samples=n_samples, n_features=n_features,\n",
    "                                 n_clusters_per_class=1, n_informative=int(n_features / 3),\n",
    "                                 random_state=123, n_classes=5)\n",
    "X = X.astype(np.float32)\n",
    "y = y.astype(np.int32)\n",
    "X_train, X_test, y_train, y_test = model_selection.train_test_split(X, y, test_size=test_size)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Distribute data to worker GPUs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "n_partitions = n_workers\n",
    "\n",
    "def distribute(X, y):\n",
    "    # First convert to cudf (with real data, you would likely load in cuDF format to start)\n",
    "    X_cudf = cudf.DataFrame(X)\n",
    "    y_cudf = cudf.Series(y)\n",
    "\n",
    "    # Partition with Dask\n",
    "    # In this case, each worker will train on 1/n_partitions fraction of the data\n",
    "    X_dask = dask_cudf.from_cudf(X_cudf, npartitions=n_partitions)\n",
    "    y_dask = dask_cudf.from_cudf(y_cudf, npartitions=n_partitions)\n",
    "\n",
    "    # Persist to cache the data in active memory\n",
    "    X_dask, y_dask = \\\n",
    "      dask_utils.persist_across_workers(c, [X_dask, y_dask], workers=workers)\n",
    "    \n",
    "    return X_dask, y_dask\n",
    "\n",
    "X_train_dask, y_train_dask = distribute(X_train, y_train)\n",
    "X_test_dask, y_test_dask = distribute(X_test, y_test)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Build a scikit-learn model (single node)\n",
    "\n",
    "Dask does not currently have a simple wrapper for scikit-learn's RandomForest, but scikit-learn does offer multi-CPU support via joblib, which we'll use."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "# Use all avilable CPU cores\n",
    "skl_model = sklRF(max_depth=max_depth, n_estimators=n_trees, n_jobs=-1)\n",
    "skl_model.fit(X_train.get(), y_train.get())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Train the distributed cuML model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "cuml_model = cumlDaskRF(max_depth=max_depth, n_estimators=n_trees, n_bins=n_bins, n_streams=n_streams)\n",
    "cuml_model.fit(X_train_dask, y_train_dask)\n",
    "\n",
    "wait(cuml_model.rfs) # Allow asynchronous training tasks to finish"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Predict and check accuracy"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "skl_y_pred = skl_model.predict(X_test.get())\n",
    "cuml_y_pred = cuml_model.predict(X_test_dask).compute().to_numpy()\n",
    "\n",
    "# Due to randomness in the algorithm, you may see slight variation in accuracies\n",
    "print(\"SKLearn accuracy:  \", accuracy_score(y_test, skl_y_pred))\n",
    "print(\"CuML accuracy:     \", accuracy_score(y_test, cuml_y_pred))"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
